package me.zingon.open.bigsmart.service;

import lombok.extern.slf4j.Slf4j;
import me.zingon.open.bigsmart.constant.JOB_STATUS;
import me.zingon.open.bigsmart.model.Job;
import me.zingon.open.bigsmart.queue.DelayJobBucket;
import me.zingon.open.bigsmart.queue.JobPoll;
import me.zingon.open.bigsmart.queue.ReadyQueue;
import me.zingon.open.bigsmart.thread.BucketTimer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author ztc 1423047407@qq.com
 * @version 1.0
 * @date 2020-8-6 13:17
 */
@Service
@Slf4j
public class OperationService {
    @Autowired
    DelayJobBucket delayJobBucket;

    @Autowired
    JobPoll jobPoll;

    @Autowired
    ReadyQueue readyQueue;

    @Autowired
    BucketTimer bucketTimer;


    public boolean addJob(Job job) {
        job.setStatus(JOB_STATUS.DEALY.getCode());
        job.setExecTime(job.getDelay() + System.currentTimeMillis());
        job.setCreateTime(System.currentTimeMillis());

        if (!delayJobBucket.add(job)) {
            return false;
        }
        if (!jobPoll.add(job)) {
            return false;
        }
        log.info("添加新延时任务：{}",job);
        return true;

    }

    public Job getReadyJob(String topic) {
        String id = readyQueue.pop(topic);
        if(id == null){
            return null;
        }

        Job job = jobPoll.getById(topic,id);
        if(job==null){
            return null;
        }

        if(!job.getStatus().equals(JOB_STATUS.READY.getCode())){
            log.info("job元信息 状态不是 ready,是：{}",job);
            return null;
        }

        jobPoll.changeStatus(job.getTopic(),job.getId(),JOB_STATUS.RESERVED);

        job.setExecTime(System.currentTimeMillis()+job.getTtr());

        //更新 元数据 执行时间
        jobPoll.add(job);
        delayJobBucket.add(job);

        return job;
    }

    public boolean rmComplete(String topic, String key) {
        return jobPoll.rm(topic,key);
    }
}
